Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes towards an engine that could help in Neurolang #1

Open
wants to merge 39 commits into
base: main
Choose a base branch
from

Conversation

demianw
Copy link
Member

@demianw demianw commented Feb 7, 2021

Added

  • Implementation of SQL Except
  • Better compatibility with pyHive connector for SQLAlchemy
  • Some new optimisation rules
  • Included some new debugging

demianw and others added 30 commits February 7, 2021 18:24
commit e5fac1a
Author: Nils Braun <[email protected]>
Date:   Sun Feb 7 16:20:55 2021 +0100

    Aggregate improvements and SQL compatibility (dask-contrib#134)

    * A lot of refactoring the the groupby. Mainly to include both distinct and null-grouping

    * Test for non-dask aggregations

    * All NaN data needs to go into the same partition (otherwise we can not sort)

    * Fix compatibility with SQL on null-joins

    * Distinct is not needed, as it is optimized away from Calcite

    * Implement is not distinct

    * Describe new limitations and remove old ones

    * Added compatibility test from fugue

    * Added a test for sorting with multiple partitions and NaNs

    * Stylefix

commit 7273c2d
Author: Nils Braun <[email protected]>
Date:   Sun Feb 7 15:34:55 2021 +0100

    Docs improvements (dask-contrib#132)

    * Fixed a bug in function references in docs

    * More details on the dask-sql internals

commit bdc518e
Author: Nils Braun <[email protected]>
Date:   Sun Feb 7 14:19:50 2021 +0100

    Fix the fugue dependency (dask-contrib#133)
@nils-braun
Copy link

Hi @demianw! Sorry for spying around on your fork :-)

I have seen you have done a lot of changes and added in total two new LogicalPlan implementations and the additional materialized table implementation, which I think is quite interesting!
I would be very happy to have your ideas and changes on board - also in the main dask-sql repository. Is there anything you would like to port (now or later)? Is there anything I can help you with?
I have seen you have already created a first PR for the Except implementation, which is great!

I am happy to discuss with you any further collaboration :-)

@demianw
Copy link
Member Author

demianw commented May 3, 2021

Hi @nils-braun in fact @jonasrenault took most of the work. We have extended and adapted the functionality but very aimed at our use-case which is a Datalog implementation. Hence the planners might use a different set of rules than those more adapted to SQL semantics.

We would be happy to contribute better to Dask-SQL but we should define a work-plan as our time availability for the desk-sql part is limited.

We might discuss with @jonasrenault if you are up for it and see what makes sense to include in the main project and how.

Hi @demianw! Sorry for spying around on your fork :-)

I have seen you have done a lot of changes and added in total two new LogicalPlan implementations and the additional materialized table implementation, which I think is quite interesting!
I would be very happy to have your ideas and changes on board - also in the main dask-sql repository. Is there anything you would like to port (now or later)? Is there anything I can help you with?
I have seen you have already created a first PR for the Except implementation, which is great!

I am happy to discuss with you any further collaboration :-)

@jonasrenault
Copy link

Hi @nils-braun ,

Thanks for the great library :) We have indeed adapted it recently to try to fit it into our specific use case, which is trying to benefit from Calcite's query optimizer when solving datalog queries. As @demianw mentioned, we haven't tried to maintain full compatibility between dask-sql and SQL since this isn't our priority, so some of the changes we've made probably shouldn't be integrated into dask-sql.

Here are the main points we worked on :

  • We've added the two plugins for intercept and minus which I think I could easily cleanup and get ready for merging into dask-sql
  • We've refactored the aggregate and join plugins to support a larger range of cases, but at the cost of losing strict compatibility between dask-sql and SQL.
    • For joins I think it was mainly supporting more join conditions, as only column equality was supported in dask-sql.
    • For aggregates we tried to support multi column aggregations, as well as using the groupby().apply() method for aggregates instead of creating dd.Aggregations since we use a lot of custom user-provided aggregate functions.
  • I also refactored the assign method in dask_sql/datacontainer.py. You use this method in a lot of places, often just to rename columns. This seemed like it was taking up a lot of resources in dask, as it's assigning new columns instead of just renaming them. I'm not sure of the performance improvement (I haven't measured it), but this might be worth looking into. Unless there's a reason for doing it this way which I missed.
  • We spent some time working on the Calcite part to figure out the right set of optimizer rules for our use case, but the lack of documentation for Calcite has made that part very difficult. I had to dig through the repositories of Calcite and Flink to try to find some examples of how to do this. In the end this is what worked out best for us but it is very specific to our use case.
  • Same goes for the MaterializedViews optimizer. Our intention was to be able to provide an SQL query along with the dataset whenever we create a table in the context so that we can register the SQL query and table as a MaterializedView and let Calcite optimize later queries with it. But again I coulnd't find any relevant documentation on the proper way to do this, and this part wasn't finished. We aren't using it at the moment and I don't think it works right now. If you have some knowledge of Calcite we would welcome your input :)

@nils-braun
Copy link

Great answers and thoughts, both @demianw and @jonasrenault.
I am very glad you consider contributing upstream, let's find out together what would be the best way of working together.
(side note: I haven't looked too deep into NeuroLang so far, but it looks really cool! You implemented a new language on top of Python, really nice)

We've added the two plugins for intercept and minus which I think I could easily cleanup and get ready for merging into dask-sql

That makes a lot of sense and I think we can start here. There is already one PR open (dask-contrib#135) which just needs a bit of cleanup. I am very happy to help here, I just don't want to interfere with you. I would be happy to support here (e.g. if you do not want to implement tests for all edge cases etc.).

For joins I think it was mainly supporting more join conditions, as only column equality was supported in dask-sql.

I think we have diverged a bit, because this is now also implemented in dask-sql. For "totally arbitrary" join conditions it is unfortunately needed to do a full cross join and then filter afterwards, but after dask-contrib#148 was fixed, this should at least be a bit more optimized... I however like your logic to also parse the SqlCast operation.

For aggregates we tried to support multi column aggregations

Ok, I see. Here I think it makes sense that you fork, because .apply() is slower for built-in functions (like max, sum etc.) and I do not expect that this will be the main use case for dask-sql users. So good decision from your side!

I also refactored the assign method in dask_sql/datacontainer.py

That is really cool, because I have also seen that this is a major performance bottleneck. I already tried to reduce the number of usages of the assign column, but I would be very happy to also get your optimizations in. I haven't checked how "smart" the renaming function is on cycling renames, but apart from that I see no problem.

but the lack of documentation for Calcite has made that part very difficult.

Unfortunately, I can definitely agree. Calcite is very very good in doing what it does, it just needs a bit more documentation for library users.

If you have some knowledge of Calcite we would welcome your input

I do not think I can help you much here (also just a "user"). The way dask-sql uses materialized queries, is by storing the Dask graph as a new table. By this, the graph is re-evaluated on every calculation, which means any access to external systems (e.g. files on disk) is done again. However, this does not imply access to other dask-sql tables (they are "hardcoded" into the graph). so that might or might not what you need.

So as a summary, I am really impressed with the amount of work you have put into this and the results. I would be very happy to have the two plugins you implemented in dask-sql and maybe also the optimization for the assign. If you have some time, you can check if the newly implemented join function in dask-sql solves also your use case, but this is not a must.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants